Airflow2.1.1实战中踩过的坑总结!!

您所在的位置:网站首页 airflow web ui Airflow2.1.1实战中踩过的坑总结!!

Airflow2.1.1实战中踩过的坑总结!!

#Airflow2.1.1实战中踩过的坑总结!!| 来源: 网络整理| 查看: 265

    从初次接触Airflow到正式使用已经过取一个月,对Airflow也有了一个最基本的了解.使用过程中,踩了不少坑. 同时根据实际使用情况对airflow做了一些优化,现在把踩过的坑分享给大家,共勉!

    博客列举的所有问题均是自己亲身经历,如有不严谨或者不正确的地方,烦请指出. 遇到什么问题或者有什么心得也欢迎与我交流!

注: 本博客使用的Airflow版本为2.1.1,细节之处可能与其它版本有所出入!

文章目录 一. Airflow实战中踩过的坑1. 后台上传dag airflow webui刷新延时问题2. 依赖相关问题3. airflow dag不能编译执行的问题4. dag手动调度不触发的问题4.1 界面手动触发`dag`,`dag`不执行,并且处于`none`状态4.2 界面手动触发`dag`,`dag`不执行,并且处于`queued`状态 5. mysql连接问题6. sqoop任务中参数解析异常问题7. airflow调度中无法访问s38. signals SIGTERM to subprocesses unexpectedly9. 路径问题9.1 相对路径引发的问题 10. Dag Id不是全局唯一引发的问题11. 分布式部署场景下,dag执行失败,日志无法正常查看的问题12. 任务停止后,仍然存在大量进程,最终worker进程挂掉,airflow无法正常工作13. dag或者task命名不符合airflow规范

一. Airflow实战中踩过的坑 1. 后台上传dag airflow webui刷新延时问题

具体表现: 将airflow dag代码上传至后台,等了好长时间airflow web界面都没有刷新

原因一: airflow.cfg文件中worker_refresh_interval默认时间间隔为6000. 时间较长,将该参数修改为10,即可编写dag之后很快刷新

原因二: dag没有放置在airflow.cfg dags_folder配置指定的目录下 请检查airflow.cfg dags_folder配置,确定文件上传路径是否有误

原因三: dag编写时后缀不是.py导致airflow无法执行 dag代码存在问题. 举例: 如下图所示,因为dag存在编码问题导致不能被识别 在这里插入图片描述

2. 依赖相关问题

airflow导入dag报错诸如如下:

No module named 'airflow.providers.apache.sqoop'

原因:缺乏airflow sqoop相关依赖,使用如下命令安装即可,缺乏其它依赖也是同理

AIRFLOW_VERSION=2.1.1 PYTHON_VERSION="$(python3.7 --version | cut -d " " -f 2 | cut -d "." -f 1-2)" CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-no-providers-${PYTHON_VERSION}.txt" pip3.7 install "apache-airflow[sqoop]==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

注意: 要指定airflow版本,并且要使用正确的pip命令安装, 比如说我机器对应的的是pip3.7 在这里插入图片描述 否则可能会卸载掉当前版本的airflow

如下图(使用自带的pip命令, pip install airflow.providers.apache.sqoop就导致当前版本的airflow被卸载了 ) 在这里插入图片描述

3. airflow dag不能编译执行的问题

原因: 应用方使用的python环境有问题,airflow2.1.1配置的python环境是python3.7 对应的目录为/usr/python3.7/bin 编译执行应该使用的命令为python3.7 ***.py 如下图所示:在这里插入图片描述

而不是默认的python ***.py 如果有和我一样应用场景的话,请注意!

4. dag手动调度不触发的问题 4.1 界面手动触发dag,dag不执行,并且处于none状态

原因: dag处于paused状态,如下图

在这里插入图片描述

这种状态按照airflow的设计,是不会被调度的,如下图: 为airflow/jobs/scheduler_job.py源码 在这里插入图片描述

该情形下,需要界面点击,将dag状态变更为active,然后再执行调度即可 如下图所示的dag才会被airflow的scheduler调度 在这里插入图片描述

4.2 界面手动触发dag,dag不执行,并且处于queued状态

原因: 很有可能是dag id的全局唯一性没有保障

此场景主要出现在dag代码复用, 文件拷贝修改的时候没有修改dag id导致 将dag id修改为全局唯一再次触发即可

5. mysql连接问题

dag访问mysql报错: 报错详情如下:

[2021-08-04 14:07:52,688] {sqoop.py:111} INFO - b'Error: java.lang.RuntimeException: java.lang.RuntimeException: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure' [2021-08-04 14:07:52,688] {sqoop.py:111} INFO - b'' [2021-08-04 14:07:52,688] {sqoop.py:111} INFO - b'The last packet successfully received from the server was 15 milliseconds ago. The last packet sent successfully to the server was 8 milliseconds ago.' [2021-08-04 14:07:52,688] {sqoop.py:111} INFO - b'at org.apache.sqoop.mapreduce.db.DBInputFormat.setConf(DBInputFormat.java:167)' [2021-08-04 14:07:52,688] {sqoop.py:111} INFO - b'at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:76)' [2021-08-04 14:07:52,689] {sqoop.py:111} INFO - b'at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)' [2021-08-04 14:07:52,689] {sqoop.py:111} INFO - b'at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:751)' [2021-08-04 14:07:52,689] {sqoop.py:111} INFO - b'at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)' [2021-08-04 14:07:52,689] {sqoop.py:111} INFO - b'at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:170)' [2021-08-04 14:07:52,689] {sqoop.py:111} INFO - b'at java.security.AccessController.doPrivileged(Native Method)' [2021-08-04 14:07:52,689] {sqoop.py:111} INFO - b'at javax.security.auth.Subject.doAs(Subject.java:422)' [2021-08-04 14:07:52,689] {sqoop.py:111} INFO - b'at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)' [2021-08-04 14:07:52,689] {sqoop.py:111} INFO - b'at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:164)' [2021-08-04 14:07:52,689] {sqoop.py:111} INFO - b'Caused by: java.lang.RuntimeException: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure'

原因: SSL问题导致,在MySQL链接后面添加useSSL=false即可

6. sqoop任务中参数解析异常问题

问题表现: 使用airflow执行sqoop时, 配置的参数没有按照预期解析,导致sqoop命令执行失败 以下是Sqoop Operator对于sqoop参数的解析说明 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UkYuZfnV-1629286875752)(/tfl/pictures/202108/tapd_51752851_1628060778_36.png)]

举例: 执行sqoop,我需要引入一个参数

-Dmapreduce.job.user.classpath.first=true

但是按照上述sqoop参数解析的规则,就会变成

---Dmapreduce.job.user.classpath.first=true # 或者 --Dmapreduce.job.user.classpath.first=true

这都是不对的

最终,将该参数前置到hadoop的mapred-site.xml文件中,如下图: 在这里插入图片描述

之后重启相关服务,问题即可解决,无需再手动在脚本中指定

7. airflow调度中无法访问s3

报错场景: 我们的应用数据是存储在AWS的s3 因此需要通过airflow去访问s3, 使用过程中遇到如下报错:

ERROR tool.ImportTool: Imported Failed: Wrong FS: s3a://***, expected: hdfs://***'

原因: 访问s3需要配置相关认证,在core-site.xml中添加如下配置: 如果在脚本中引入配置,需要配置在sqoop参数的最前面,否则会造成参数无法解析,如下图: 没有将s3的认证配置放在参数最前面,导致解析异常 在这里插入图片描述

sqoop import -Dfs.s3a.access.key=... -Dfs.s3a.secret.key=... --target-dir s3a://

在这里插入图片描述

8. signals SIGTERM to subprocesses unexpectedly

相关报错如下: 在这里插入图片描述

问题原因: 任务被强行杀死,因airflow不认为线程作业仍在运行并认为主线程已完成。 解决方法,调大相关参数:

# 让airflow在所有线程完成之前等待更多时间。 killed_task_cleanup_time = 6000

从默认值60调大到6000,该问题得到解决

9. 路径问题 9.1 相对路径引发的问题

脚本里面配置相对路径,可能因为环境原因无法被识别,要配置为绝对路径. 分布式环境下,涉及到外部脚本,jar包调用,需要按照dag写法分发到全部的worker节点的相同路径下

10. Dag Id不是全局唯一引发的问题

引发的问题1,见问题4.2

问题2: 注意, Dag Id全局唯一还包括以下场景: 某个dag id暂时不用了,后面复用. 此时应该保证 之前使用该dag id的任务全部执行完成,没有在running状态,否则,会出现相互影响的情况 如以下报错,由数据库事务异常引起的问题,对应的dag id,就存在上述使用场景

sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (pymysql.err.IntegrityError) (1062, "Duplicate entry 'end-***-2021-08-04 07:02:00.000000' for key 'PRIMARY'") [parameters: ({'task_id': '***', 'dag_id': '***', 'execution_date': datetime.datetime(2021, 8, 4, 7, 2), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 0, 'hostname': '', 'unixname': 'airflow', 'job_id': None, 'pool': 'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 25, 'operator': 'PythonOperator', 'queued_dttm': None, 'queued_by_job_id': None, 'pid': None, 'executor_config': b'\x80\x04}\x94.', 'external_executor_id': None}, {'task_id': '***', 'dag_id': '***'

解决方法: 进行事务回滚,然后重启airflow scheduler

nohup airflow scheduler & 11. 分布式部署场景下,dag执行失败,日志无法正常查看的问题

报错如下图所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-r8L6Fssb-1629286875773)(/tfl/pictures/202108/tapd_51752851_1628077266_8.png)]

以下是github上面有关该问题讨论的一些情况,目前看来有较大的可能性是airflow自身的bug AIRFLOW-4922 Fix task get log by Web UI

该问题相对复杂,后面可能会考虑单独写一篇博客讨论这个问题

12. 任务停止后,仍然存在大量进程,最终worker进程挂掉,airflow无法正常工作 ps -ef | grep airflow

发现后台存在大量的如下进程 在这里插入图片描述

但在airflow界面上,所有dag均是paused状态 查看上述排查出来的有大量后台进程的dag的代码, 发现了有很多类似查看Hdfs文件是否存在,不存在就sleep,这样的写法 导致一直在占用worker的计算资源不会释放.最终worker挂掉,airflow不可用

解决方案:这种情况下可采用Hdfs Sensor这种方式解决问题,

杀掉后台相关进程,使用Hdfs Sensor改写完dag之后,该问题得到解决

13. dag或者task命名不符合airflow规范

具体表现为,界面上查看dag代码报错 在这里插入图片描述

当开启任务调度时,scheduler会立即挂掉,并且不能正常重启, 启动时报错:

airflow.exceptions.AirflowException: The key (dfsadfasdfs.cleanFiletask) has to be made of alphanumeric characters, dashes, dots and underscores exclusively

从报错中可以得知,是task命名不符合规范,airflow task命名只支持数字,字母,下划线其余的字符都会被认定为非法

解决方案: 按照airflow规范命名即可



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3